跳到主要内容

Go 语言并发学习-sync 包-协程组管理

WaitGroup 等待一组协程

官方文档对 WaitGroup 的描述是:一个 WaitGroup 对象可以等待一组协程结束。其实就是 Java 中的 CountDownLatch

  1. main 协程通过调用 wg.Add(delta int) 设置 worker 协程的个数,然后创建 worker 协程;
  2. worker 协程执行结束以后,都要调用 wg.Done()
  3. main 协程调用 wg.Wait() 且被 block,直到所有 worker 协程全部执行结束后返回。

使用例如下:

// src/cmd/compile/internal/ssa/gen/main.go
func main() {
// 省略部分代码 ...
var wg sync.WaitGroup

for _, task := range tasks {
task := task
wg.Add(1)
go func() {
task()
wg.Done()
}()
}

wg.Wait()
// 省略部分代码...
}

条件变量 sync.Cond

sync.Cond 是什么?

sync.Cond 经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。

我们想象一个非常简单的场景:

有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。

这个时候,就需要 有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。

Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。

sync.Cond 的定义如下:

type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

每个 Cond 实例都会关联一个锁 L(互斥锁 *Mutex,或读写锁 *RWMutex),当修改条件或者调用 Wait 方法时,必须加锁。

sync.Cond 的方法 ⭐

sync.Cond 相关的有如下几个方法:

// NewCond 创建 Cond 实例时,需要关联一个锁。
func NewCond(l Locker) *Cond

// Broadcast 广播唤醒所有,Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。
func (c *Cond) Broadcast()

// Signal 唤醒一个协程,Signal 只唤醒任意 1 个等待条件变量 c 的 goroutine,无需锁保护。
func (c *Cond) Signal()

// Wait 等待
func (c *Cond) Wait()

调用 Wait 会自动释放锁 c.L,并挂起调用者所在的 goroutine,因此当前协程会阻塞在 Wait 方法调用的地方。如果其他协程调用了 Signal 或 Broadcast 唤醒了该协程,那么 Wait 方法在结束阻塞时,会重新给 c.L 加锁,并且继续执行 Wait 后面的代码。

对条件的检查,使用了 for 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码。

c.L.Lock() // 先取得了锁
for !condition() {
c.Wait() // 暂时释放锁,得到通知后再释放锁
}


... make use of condition ...
c.L.Unlock() // 解锁

使用示例(广播)

接下来我们实现一个简单的例子,三个协程调用 Wait() 等待,另一个协程调用 Broadcast() 唤醒所有等待的协程。

var done = false

func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}

func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}

func main() {
cond := sync.NewCond(&sync.Mutex{})

go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
// 等待一秒通知
write("writer", cond)

time.Sleep(time.Second * 3)
}

打印结果:

2021/11/28 21:49:14 writer starts writing
2021/11/28 21:49:15 writer wakes all
2021/11/28 21:49:15 reader3 starts reading
2021/11/28 21:49:15 reader1 starts reading
2021/11/28 21:49:15 reader2 starts reading

errgroup 包

errgroup 包是一个用来捕获一组协程中产生的异常的工具

使用场景:例如需要返回错误,或者只要一个 goroutine 出错我们就不再等其他 goroutine 了,减少资源浪费

使用 waitGroup 可以实现一个 goroutine 等待一组 goroutine 干活结束,更好的实现了任务同步,但是 waitGroup 却无法返回错误,当一组 Goroutine 中的某个 goroutine 出错时,我们是无法感知到的,所以 errGroup 对 waitGroup 进行了一层封装

提示

使用它的时候无需再使用 waitGroup,它内部已经封装了

使用栗子

package main

import (
"fmt"
"net/http"

"golang.org/x/sync/errgroup"
)

func main() {
g := new(errgroup.Group)
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}

for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}

// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
}
}

请求中只要有一个有错误就会返回 error(它会自动调用 g.wg.Done() 释放),且给后面的所有的子 Context 发送关闭信号,而且 g.Wait() 会返回第一个 err

Reference

sync 包(二):条件变量 sync.Cond 极客兔兔 条件变量 sync.Cond